I. Thống kê cơ bản (Basic Statistics)
Việc tính toán tương quan giữa hai chuỗi dữ liệu là một thao tác phổ biến trong thống kê. Trong spark.ml, bạn có thể tính toán ma trận tương quan cặp giữa nhiều chuỗi dữ liệu. Các phương pháp tương quan được hỗ trợ hiện tại bao gồm tương quan Pearson và Spearman.
Hàm Correlation.corr tính toán ma trận tương quan cho tập dữ liệu đầu vào chứa các vector, sử dụng phương pháp được chỉ định. Kết quả trả về là một DataFrame chứa ma trận tương quan của cột vector.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("CorrelationExample").getOrCreate()
# Dữ liệu mẫu
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
(Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
(Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
(Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
# Tạo DataFrame
df = spark.createDataFrame(data, ["features"])
# Tính hệ số tương quan Pearson
r1 = Correlation.corr(df, "features").head()
print("Ma trận tương quan Pearson:\n", r1[0])
# Tính hệ số tương quan Spearman
r2 = Correlation.corr(df, "features", "spearman").head()
print("Ma trận tương quan Spearman:\n", r2[0])
MLlib cung cấp các công cụ để thực hiện kiểm định giả thuyết, giúp xác định xem một giả thuyết thống kê có thể được chấp nhận hay không dựa trên dữ liệu mẫu.
Kiểm định Chi-Square được sử dụng để kiểm tra sự độc lập giữa các biến hoặc để kiểm tra sự phù hợp của phân phối quan sát với phân phối mong đợi.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("ChiSquareTestExample").getOrCreate()
data = [(0.0, Vectors.dense(0.5, 10.0)),
(0.0, Vectors.dense(1.5, 20.0)),
(1.0, Vectors.dense(1.5, 30.0)),
(0.0, Vectors.dense(3.5, 30.0)),
(0.0, Vectors.dense(3.5, 40.0)),
(1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])
r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))
MLlib cung cấp các công cụ để tính toán các thống kê tóm tắt như trung bình, phương sai, min, max, và norm L2 cho các vector đầu vào.
from pyspark.sql import SparkSession
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("ChiSquareTestExample").getOrCreate()
df = spark.createDataFrame([
Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))])
# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")
# Tính toán thống kê với trọng số
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)
# Tính toán thống kê không có trọng số
df.select(summarizer.summary(df.features)).show(truncate=False)
# Tính toán thống kê cho một metric đơn lẻ ("mean") có trọng số
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
# Tính toán thống kê cho một metric đơn lẻ ("mean") không có trọng số
df.select(Summarizer.mean(df.features)).show(truncate=False)
Trong MLlib, dữ liệu có thể đến từ nhiều nguồn khác nhau. Spark ML hỗ trợ đọc và ghi dữ liệu từ nhiều định dạng phổ biến như Parquet, JSON, CSV, LIBSVM, giúp dễ dàng thao tác với dữ liệu máy học.
Nguồn dữ liệu hình ảnh được sử dụng để tải ảnh từ một thư mục. Nó có thể đọc các tệp ảnh nén (JPEG, PNG, v.v.) và chuyển đổi chúng thành định dạng ảnh thô bằng thư viện ImageIO của Java.
Khi dữ liệu được tải vào Spark, nó sẽ được lưu trong một DataFrame với một cột duy nhất có kiểu StructType mang tên "image", chứa dữ liệu ảnh theo định dạng schema ảnh.
Cấu trúc của cột "image" gồm các trường sau:
Tổ chức thư mục:
from pyspark.sql import SparkSession
# Tạo SparkSession
spark = SparkSession.builder.appName("ImageDataExample").getOrCreate()
# Đọc dữ liệu hình ảnh
image_df = spark.read.format("image").load("file:///home/hadoopvinhtuong/MLib/Data_sources/images")
# file:// là để đọc ảnh được lưu trữ local
# hdfs:// là để đọc ảnh được lưu trữ trên hadoop
# Hiển thị dữ liệu
image_df.select("image.origin", "image.width", "image.height").show(truncate=False)
LIBSVM là định dạng phổ biến trong học máy, thường chứa label (nhãn) và features (vector đặc trưng).
Dữ liệu sau khi được tải vào Spark DataFrame sẽ có hai cột chính:
Tổ chức thư mục:
from pyspark.sql import SparkSession
# Tạo SparkSession
spark = SparkSession.builder.appName("ImageDataExample").getOrCreate(
# Đọc dữ liệu hình ảnh
df = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
df.show(10)
ML Pipelines cung cấp một tập hợp các API cấp cao, thống nhất, được xây dựng trên nền tảng DataFrames, giúp người dùng tạo và tinh chỉnh các pipeline học máy thực tiễn.
Các khái niệm chính trong Pipelines:
Machine learning có thể được áp dụng cho nhiều loại dữ liệu khác nhau, chẳng hạn như vector, văn bản, hình ảnh và dữ liệu có cấu trúc. Để hỗ trợ sự đa dạng này, API của Spark ML sử dụng DataFrame từ Spark SQL.
DataFrame hỗ trợ nhiều kiểu dữ liệu cơ bản và có cấu trúc. Ngoài ra, DataFrame cũng có thể chứa các kiểu vector đặc trưng của ML.
Có thể tạo DataFrame từ một RDD theo cách ngầm định hoặc tường minh.
Trong DataFrame, các cột được đặt tên cụ thể. Ví dụ trong các đoạn code ví dụ, ta có các cột như "text", "features" và "label".
Transformer là một khái niệm trừu tượng bao gồm cả các bộ biến đổi đặc trưng (feature transformers) và các mô hình đã học. Về mặt kỹ thuật, Transformer triển khai phương thức transform(), có nhiệm vụ chuyển đổi một DataFrame thành một DataFrame khác bằng cách thêm một hoặc nhiều cột. Ví dụ:
Estimator là một khái niệm trừu tượng đại diện cho thuật toán học hoặc bất kỳ thuật toán nào cần huấn luyện trên dữ liệu. Về mặt kỹ thuật, Estimator triển khai phương thức fit(), nhận vào một DataFrame và tạo ra một Model (là một Transformer).
Ví dụ, thuật toán Logistic Regression là một Estimator. Khi gọi fit(), nó sẽ huấn luyện và tạo ra một LogisticRegressionModel, vốn là một Model và cũng là một Transformer.
Transformer.transform() và Estimator.fit() đều không có trạng thái (stateless), tức là chúng không lưu trạng thái giữa các lần gọi.Trong machine learning, chúng ta thường cần chạy một chuỗi thuật toán để xử lý và học từ dữ liệu. Ví dụ, quy trình xử lý văn bản có thể bao gồm:
Spark ML biểu diễn quy trình này dưới dạng Pipeline, gồm một chuỗi các PipelineStages (có thể là Transformer hoặc Estimator) thực hiện theo một thứ tự nhất định.
Một Pipeline bao gồm nhiều giai đoạn (stages), mỗi giai đoạn là một Transformer hoặc Estimator. Khi thực thi Pipeline:
fit() sẽ được gọi để huấn luyện mô hình. Sau đó, mô hình này (một Transformer) sẽ được sử dụng để chuyển đổi DataFrame trước khi chuyển tiếp sang giai đoạn tiếp theo.transform() của nó sẽ được gọi trên DataFrame.Giai đoạn Huấn luyện (Training Time)
Các bước thực hiện:
fit(), nó tạo ra một LogisticRegressionModel.fit() chạy, Pipeline tạo ra một PipelineModel, trong đó các Estimator đã trở thành Transformer.Giai đoạn Dự đoán (Test Time)
Các bước dự đoán:
Kết quả cuối cùng là một DataFrame với cột Predictions, chứa nhãn dự đoán cho dữ liệu mới.
fit(), Pipeline tạo ra một PipelineModel, trong đó tất cả Estimator đã trở thành Transformer.PipelineModel.transform(), dữ liệu đi qua toàn bộ quy trình tương tự như lúc huấn luyện để đảm bảo tính nhất quán giữa tập huấn luyện và tập thử nghiệm.Pipeline giúp tự động hóa quy trình tiền xử lý và huấn luyện, giúp mã nguồn trở nên gọn gàng và dễ bảo trì hơn
myHashingTF hai lần trong một Pipeline, nhưng có thể sử dụng hai đối tượng myHashingTF1 và myHashingTF2 riêng biệt.Spark ML cung cấp một API thống nhất để thiết lập tham số:
(tham số, giá trị), dùng để truyền tham số vào Estimator hoặc Transformer.Có hai cách để đặt tham số:
lr.setMaxIter(10) # Đặt số lần lặp tối đa là 10
fit() hoặc transform()paramMap = ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)
Trong thực tế, việc lưu mô hình hoặc Pipeline để sử dụng sau này rất quan trọng. Spark hỗ trợ lưu trữ và tải lại mô hình từ Spark 1.6. Từ Spark 2.3 trở đi, API DataFrame-based trong spark.ml hỗ trợ đầy đủ việc lưu trữ.
Spark ML đảm bảo tương thích ngược cho các Pipeline:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder.master("local").appName("Example").getOrCreate()
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore
# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2) # type: ignore
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())
# Prepare test data
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
LogisticRegression parameters:
Model 1 was fit using parameters:
Model 2 was fit using parameters:
Test:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
spark = SparkSession.builder.master("local").appName("Example").getOrCreate()
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
)
)
kết quả:
Phương pháp vector hóa đặc trưng phổ biến trong khai thác văn bản, phản ánh tầm quan trọng của một từ trong tài liệu thuộc tập hợp.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
spark = SparkSession.builder.appName("TF-IDF Example").getOrCreate()
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
Output:
Mô hình tính toán biểu diễn vector phân tán của từ, giúp các từ tương tự gần nhau trong không gian vector, hỗ trợ trong các ứng dụng xử lý ngôn ngữ tự nhiên.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
spark = SparkSession.builder.appName("Word2Vec Example").getOrCreate()
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
Output:
Chuyển đổi tập hợp tài liệu văn bản thành vector số lượng từ, hữu ích trong việc xây dựng các mô hình học máy.
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer
spark = SparkSession.builder.appName("CountVectorizer Example").getOrCreate()
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
Output:
Sử dụng hàm băm để chuyển đổi các đặc trưng thành vector số, giúp giảm kích thước không gian đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import FeatureHasher
spark = SparkSession.builder.appName("FeatureHasher Example").getOrCreate()
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)
Output:
Chia văn bản thành các từ hoặc câu dựa trên dấu cách hoặc dấu câu
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.appName("Tokenizer Example").getOrCreate()
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
Output:
Loại bỏ các từ dừng (như "và", "hoặc"). Từ dừng là những từ cần loại trừ khỏi dữ liệu đầu vào, thường là do các từ này xuất hiện thường xuyên và không mang nhiều ý nghĩa
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
spark = SparkSession.builder.appName("StopWordsRemover Example").getOrCreate()
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
Output:
Tạo ra các chuỗi liên tiếp của 'n' từ từ một văn bản, giúp mô hình hiểu ngữ cảnh tốt hơn.
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
spark = SparkSession.builder.appName("NGram Example").getOrCreate()
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
Output:
Chuyển đổi các giá trị số thành 0 hoặc 1 dựa trên ngưỡng xác định.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer
spark = SparkSession.builder.appName("Binarizer Example").getOrCreate()
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
Giảm số lượng đặc trưng bằng cách chuyển đổi chúng thành tập hợp các thành phần chính.
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("PCA Example").getOrCreate()
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
Output:
Tạo ra các đặc trưng đa thức từ các đặc trưng ban đầu, mở rộng không gian đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("PolynomialExpansion Example").getOrCreate()
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
Output:
Chuyển đổi dữ liệu từ miền thời gian sang miền tần số, hữu ích trong xử lý tín hiệu.
from pyspark.sql import SparkSession
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("DCT Example").getOrCreate()
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
output:
Chuyển đổi các giá trị chuỗi thành số nguyên, giúp mô hình học máy xử lý dễ dàng hơn.
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
spark = SparkSession.builder.appName("StringIndexer Example").getOrCreate()
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
Output:
Chuyển đổi các chỉ số số nguyên trở lại giá trị chuỗi ban đầu.
from pyspark.sql import SparkSession
from pyspark.ml.feature import IndexToString, StringIndexer
spark = SparkSession.builder.appName("IndexToString Example").getOrCreate()
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
Output:
Mã hóa các giá trị phân loại thành vector nhị phân, giúp mô hình hiểu rõ hơn về các danh mục.
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder
spark = SparkSession.builder.appName("OneHotEncoder Example").getOrCreate()
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
Output:
Tự động xác định và mã hóa các đặc trưng phân loại trong vector, giảm công sức tiền xử lý.
[from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorIndexer
spark = SparkSession.builder.appName("OneHotEncoder Example").getOrCreate()
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()](<from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorIndexer
spark = SparkSession.builder.appName("VectorIndexer Example").getOrCreate()
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()>)
Output:
Tạo ra các đặc trưng tương tác bằng cách kết hợp các đặc trưng hiện có.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Interaction, VectorAssembler
spark = SparkSession.builder.appName("Interaction Example").getOrCreate()
df = spark.createDataFrame(
[(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)],
["id1", "id2", "id3", "id4", "id5", "id6", "id7"])
assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")
assembled1 = assembler1.transform(df)
assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")
assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")
interacted = interaction.transform(assembled2)
interacted.show(truncate=False)
Output:
Chuẩn hóa các vector đặc trưng về độ dài 1, giúp mô hình không bị ảnh hưởng bởi độ lớn của đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("Normalizer Example").getOrCreate()
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
Output:
Chuẩn hóa các đặc trưng để có trung bình bằng 0 và độ lệch chuẩn bằng 1, giúp mô hình học hiệu quả hơn
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler
spark = SparkSession.builder.appName("StandardScaler Example").getOrCreate()
dataFrame = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Output:
Sử dụng các thống kê bền vững để chuẩn hóa đặc trưng, giảm ảnh hưởng của ngoại lệ.
from pyspark.sql import SparkSession
from pyspark.ml.feature import RobustScaler
spark = SparkSession.builder.appName("RobustScaler Example").getOrCreate()
dataFrame = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
withScaling=True, withCentering=False,
lower=0.25, upper=0.75)
# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)
# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Output:
Tỉ lệ hóa các đặc trưng về khoảng giá trị [0, 1], giúp đồng nhất các đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("MinMaxScaler Example").getOrCreate()
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
Output:
Tỉ lệ hóa các đặc trưng dựa trên giá trị tuyệt đối lớn nhất, giữ nguyên dấu của đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("MaxAbsScaler Example").getOrCreate()
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
Output:
Chia các giá trị liên tục thành các nhóm (bucket) dựa trên các ngưỡng xác định.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer
spark = SparkSession.builder.appName("Bucketizer Example").getOrCreate()
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
Output:
Nhân từng phần tử của vector đặc trưng với một vector trọng số, giúp điều chỉnh tầm quan trọng của từng đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("ElementwiseProduct Example").getOrCreate()
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
Output:
Áp dụng các phép biến đổi SQL trên DataFrame, linh hoạt trong việc xử lý và biến đổi dữ liệu.
from pyspark.sql import SparkSession
from pyspark.ml.feature import SQLTransformer
spark = SparkSession.builder.appName("SQLTransformer Example").getOrCreate()
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
Output:
Kết hợp nhiều cột đặc trưng thành một cột vector duy nhất, chuẩn bị dữ liệu cho mô hình học máy.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.appName("VectorAssembler Example").getOrCreate()
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
Output:
Cung cấp thông tin về kích thước của vector đặc trưng, giúp tránh lỗi trong quá trình xử lý.
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
spark = SparkSession.builder.appName("VectorSizeHint Example").getOrCreate()
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
(0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
sizeHint = VectorSizeHint(
inputCol="userFeatures",
handleInvalid="skip",
size=3)
datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
Output:
Chia các giá trị liên tục thành các nhóm dựa trên phân vị, giúp xử lý các đặc trưng không phân phối đều.
from pyspark.sql import SparkSession
from pyspark.ml.feature import QuantileDiscretizer
spark = SparkSession.builder.appName("QuantileDiscretizer Example").getOrCreate()
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
Output:
Điền giá trị thiếu trong dữ liệu bằng các chiến lược như trung bình hoặc trung vị, đảm bảo dữ liệu đầy đủ cho mô hình.
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
spark = SparkSession.builder.appName("Imputer Example").getOrCreate()
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
Output:
Chọn một tập con các đặc trưng từ vector đặc trưng ban đầu dựa trên chỉ số, giúp giảm kích thước không gian đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
spark = SparkSession.builder.appName("VectorSlicer Example").getOrCreate()
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()
Output:
Sử dụng cú pháp R để chỉ định các đặc trưng và nhãn, đơn giản hóa việc xây dựng mô hình.
from pyspark.sql import SparkSession
from pyspark.ml.feature import RFormula
spark = SparkSession.builder.appName("VectorSlicer Example").getOrCreate()
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
Output:
Sử dụng kiểm định Chi-squared để chọn các đặc trưng quan trọng nhất, hữu ích trong các bài toán phân loại.
from pyspark.sql import SparkSession
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("ChiSqSelector Example").getOrCreate()
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
Output:
Chọn các đặc trưng dựa trên các thống kê đơn biến, giúp loại bỏ các đặc trưng không liên quan.
from pyspark.sql import SparkSession
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("UnivariateFeatureSelector Example").getOrCreate()
df = spark.createDataFrame([
(1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
(2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
(3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
(4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
(5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
(6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)
result = selector.fit(df).transform(df)
print("UnivariateFeatureSelector output with top %d features selected using f_classif"
% selector.getSelectionThreshold())
result.show()
Output:
Loại bỏ các đặc trưng có phương sai thấp, thường không mang nhiều thông tin cho mô hình.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors
spark = SparkSession.builder.appName("VarianceThresholdSelector Example").getOrCreate()
df = spark.createDataFrame([
(1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
(2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
(3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
(4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
(5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
(6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")
result = selector.fit(df).transform(df)
print("Output: Features with variance lower than %f are removed." %
selector.getVarianceThreshold())
result.show()
Output:
The Euclidean distance is defined:
Hash bucket:
Code python:
from pyspark.sql import SparkSession
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("VarianceThresholdSelector Example").getOrCreate()
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(4, Vectors.dense([1.0, 0.0]),),
(5, Vectors.dense([-1.0, 0.0]),),
(6, Vectors.dense([0.0, 1.0]),),
(7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.dense([1.0, 0.0])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
Output:
MinHash:
Code python
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("VarianceThresholdSelector Example").getOrCreate()
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
Output:
Phân loại và hồi quy là hai nhiệm vụ chính trong học máy:
Dùng để dự đoán xác suất của một nhãn phân loại. Spark hỗ trợ cả hồi quy logistic nhị phân và đa thức.
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Load training data
training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# Fit the model
mlrModel = mlr.fit(training)
# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
Output:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Load training data
training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# Fit the model
mlrModel = mlr.fit(training)
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
.select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)
Output:
The conditional probabilities of the outcome classes
We minimize the weighted negative log-likelihood, using a multinomial response model, with elastic-net penalty to control for overfitting
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Load training data
training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))
trainingSummary = lrModel.summary
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
print(objective)
# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
print("label %d: %s" % (i, prec))
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
print("label %d: %s" % (i, rec))
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
print("label %d: %s" % (i, f))
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
% (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
Output:
Xây dựng mô hình phân loại dựa trên việc chia dữ liệu thành các nhóm con dựa trên các đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Class Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
treeModel = model.stages[2]
# summary only
print(treeModel)
Output:
Sử dụng nhiều cây quyết định để cải thiện độ chính xác và giảm thiểu overfitting.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Class Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[2]
print(rfModel) # summary only
Output:
Xây dựng mô hình bằng cách kết hợp nhiều cây quyết định được huấn luyện tuần tự, mỗi cây cố gắng sửa lỗi của cây trước đó.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Train-Validation split Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
gbtModel = model.stages[2]
print(gbtModel) # summary only
Output:
Tìm siêu phẳng phân tách các lớp trong không gian đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.classification import LinearSVC
spark = SparkSession.builder.appName("Train-Validation split Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
training = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
lsvc = LinearSVC(maxIter=10, regParam=0.1)
# Fit the model
lsvcModel = lsvc.fit(training)
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))
Output:
Dựa trên định lý Bayes và giả định độc lập giữa các đặc trưng.
from pyspark.sql import SparkSession
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Class Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# train the model
model = nb.fit(train)
# select example rows to display.
predictions = model.transform(test)
predictions.show()
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
Output:
Mạng nơ-ron nhiều lớp cho các nhiệm vụ phân loại.
Sigmoid:
Softmax func:
from pyspark.sql import SparkSession
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Class Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_multiclass_classification_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(train)
# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
Output:
Chiến lược phân loại đa lớp bằng cách huấn luyện một mô hình nhị phân cho mỗi lớp.
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Class Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
inputData = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_multiclass_classification_data.txt")
# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])
# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)
# train the multiclass model.
ovrModel = ovr.fit(train)
# score the model on test data.
predictions = ovrModel.transform(test)
# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
Output:
Kết hợp giữa hồi quy tuyến tính và mô hình tương tác bậc hai để xử lý dữ liệu thưa và có tính tương tác cao.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
spark = SparkSession.builder.appName("Class Example").getOrCreate()
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a FM model.
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)
# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])
# Train model.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)
fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
Output:
Mô hình hóa mối quan hệ giữa biến phụ thuộc và một hoặc nhiều biến độc lập bằng đường thẳng.
Mở rộng hồi quy tuyến tính để hỗ trợ các phân phối khác nhau của biến phụ thuộc.
Sử dụng cây quyết định để dự đoán giá trị liên tục.
Sử dụng nhiều cây quyết định để dự đoán giá trị liên tục, cải thiện độ chính xác và giảm thiểu overfitting.
Kết hợp nhiều cây quyết định được huấn luyện tuần tự để dự đoán giá trị liên tục.
Mô hình hóa thời gian cho đến khi xảy ra một sự kiện quan tâm, thường được sử dụng trong phân tích sống sót.
Mô hình hồi quy đơn điệu, đảm bảo rằng dự đoán không giảm hoặc không tăng theo thứ tự của biến độc lập.
Kết hợp giữa hồi quy tuyến tính và mô hình tương tác bậc hai để xử lý dữ liệu thưa và có tính tương tác cao.
Phương pháp tuyến tính là nền tảng của nhiều thuật toán phân loại và hồi quy, bao gồm hồi quy logistic và hồi quy tuyến tính. Chúng giả định mối quan hệ tuyến tính giữa các đặc trưng và biến mục tiêu.
K-means là một trong những thuật toán phân cụm phổ biến nhất, nhóm các điểm dữ liệu thành số cụm được xác định trước. MLlib triển khai một biến thể song song của phương pháp k-means++ gọi là kmeans.
Các cột đầu vào:
Các cột đầu ra:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Loads data.
dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_kmeans_data.txt")
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
Output:
LDA là một thuật toán phân cụm chủ yếu được sử dụng cho mô hình chủ đề trong xử lý ngôn ngữ tự nhiên. Nó nhóm các tài liệu thành các chủ đề dựa trên phân phối từ ngữ.
from pyspark.sql import SparkSession
from pyspark.ml.clustering import LDA
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Loads data.
dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_lda_libsvm_data.txt")
# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)
ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))
# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)
# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
Output:
Bisecting k-means là một biến thể của k-means, hoạt động bằng cách chia đôi các cụm lặp đi lặp lại cho đến khi đạt được số lượng cụm mong muốn. Phương pháp này thường dẫn đến các cụm có chất lượng cao hơn và ổn định hơn so với k-means truyền thống.
from pyspark.sql import SparkSession
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Loads data.
dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_kmeans_data.txt")
# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
print(center)
Ouput:
GMM giả định rằng dữ liệu được tạo ra từ sự kết hợp của nhiều phân phối Gaussian. Thuật toán này ước lượng các tham số của các phân phối Gaussian thành phần để mô hình hóa dữ liệu.
Các cột đầu vào:
Các cột đầu ra:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import GaussianMixture
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Loads data.
dataset = spark.read.format("libsvm").load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_kmeans_data.txt")
gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)
print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
Output:
Power Iteration Clustering (PIC) là một thuật toán phân cụm trên đồ thị có khả năng mở rộng, được phát triển bởi Lin và Cohen. Thuật toán này tìm một ánh xạ không gian có số chiều rất thấp cho dữ liệu bằng cách sử dụng phép lặp lũy thừa bị cắt trên ma trận tương đồng chuẩn hóa của dữ liệu.
Các tham số trong PowerIterationClustering của spark.ml
k: Số lượng cụm cần tạo.initMode: Phương pháp khởi tạo thuật toán.maxIter: Số lần lặp tối đa.srcCol: Tên cột chứa ID đỉnh nguồn trong đồ thị đầu vào.dstCol: Tên cột chứa ID đỉnh đích trong đồ thị đầu vào.weightCol: Tên cột chứa trọng số cạnh giữa các đỉnh.from pyspark.sql import SparkSession
from pyspark.ml.clustering import PowerIterationClustering
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
df = spark.createDataFrame([
(0, 1, 1.0),
(0, 2, 1.0),
(1, 2, 1.0),
(3, 4, 1.0),
(4, 0, 0.1)
], ["src", "dst", "weight"])
pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")
# Shows the cluster assignment
pic.assignClusters(df).show()
Output:
Lọc cộng tác (Collaborative Filtering) là một kỹ thuật phổ biến trong hệ thống gợi ý, nhằm dự đoán sở thích của người dùng dựa trên hành vi của họ và những người dùng tương tự. Trong Spark MLlib, lọc cộng tác được thực hiện thông qua phương pháp phân rã ma trận dựa trên mô hình, trong đó người dùng và sản phẩm được biểu diễn bằng một tập hợp nhỏ các yếu tố tiềm ẩn, giúp dự đoán các mục còn thiếu trong ma trận người dùng-sản phẩm. Thuật toán được sử dụng để học các yếu tố tiềm ẩn này là Alternating Least Squares (ALS).
Các tham số chính trong ALS:
numBlocks: Số lượng khối mà người dùng và sản phẩm sẽ được chia để tính toán song song (mặc định là 10).rank: Số lượng yếu tố tiềm ẩn trong mô hình (mặc định là 10).maxIter: Số lần lặp tối đa (mặc định là 10).regParam: Tham số điều chuẩn trong ALS (mặc định là 1.0).implicitPrefs: Xác định việc sử dụng biến thể ALS cho phản hồi tường minh hay phản hồi ngầm định (mặc định là false, tức là sử dụng phản hồi tường minh).alpha: Tham số áp dụng cho biến thể ALS với phản hồi ngầm định, điều chỉnh mức độ tin cậy cơ bản trong quan sát sở thích (mặc định là 1.0).nonnegative: Xác định có sử dụng ràng buộc không âm cho bình phương tối thiểu hay không (mặc định là false).Phản hồi tường minh và phản hồi ngầm định:
Quy mô của tham số điều chuẩn:
Trong quá trình cập nhật các yếu tố người dùng và sản phẩm, tham số điều chuẩn regParam được chia tỷ lệ dựa trên số lượng đánh giá mà người dùng hoặc sản phẩm nhận được. Phương pháp này, được gọi là "ALS-WR", giúp regParam ít phụ thuộc hơn vào quy mô của tập dữ liệu, cho phép áp dụng tham số tốt nhất học được từ một tập con mẫu vào toàn bộ tập dữ liệu.
Chiến lược khởi động lạnh (Cold-start):
Trong hệ thống gợi ý, vấn đề khởi động lạnh xảy ra khi có người dùng hoặc sản phẩm mới mà không có đánh giá trước đó. Để giải quyết vấn đề này, mô hình ALS trong Spark cung cấp tham số coldStartStrategy. Khi được đặt là "drop", bất kỳ dự đoán nào chứa người dùng hoặc sản phẩm chưa thấy sẽ bị loại bỏ khỏi tập kết quả, đảm bảo không có giá trị dự đoán không xác định trong đánh giá mô hình.
Lưu ý: API dựa trên DataFrame cho ALS hiện chỉ hỗ trợ các giá trị số nguyên cho ID người dùng và sản phẩm. Các kiểu số khác có thể được sử dụng cho các cột ID này, nhưng giá trị ID phải nằm trong phạm vi của kiểu số nguyên.
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
spark = SparkSession.builder.appName("logistic Example").getOrCreate()
# Loads data.
lines = spark.read.text("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
output:
Thuật toán FP-Growth, được mô tả trong bài báo của Han và cộng sự, là một phương pháp hiệu quả để tìm các tập mục thường xuyên mà không cần tạo ra các tập ứng viên, giúp giảm chi phí tính toán. Trong Spark, một phiên bản song song của FP-Growth, gọi là PFP (Parallel FP-Growth), đã được triển khai, giúp mở rộng khả năng xử lý trên các tập dữ liệu lớn.
Các tham số chính trong triển khai FP-Growth của spark.ml:
minSupport: Ngưỡng hỗ trợ tối thiểu để một tập mục được coi là thường xuyên. Ví dụ, nếu một mục xuất hiện trong 3 trên 5 giao dịch, nó có hỗ trợ là 3/5 = 0.6.minConfidence: Ngưỡng độ tin cậy tối thiểu để tạo ra luật kết hợp. Độ tin cậy cho biết mức độ thường xuyên mà một luật kết hợp được tìm thấy là đúng. Ví dụ, nếu tập mục X xuất hiện 4 lần trong các giao dịch và X cùng Y xuất hiện cùng nhau 2 lần, độ tin cậy cho luật X => Y là 2/4 = 0.5. Tham số này không ảnh hưởng đến việc khai thác các tập mục thường xuyên, nhưng xác định độ tin cậy tối thiểu để tạo ra các luật kết hợp từ các tập mục thường xuyên.numPartitions: Số lượng phân vùng được sử dụng để phân phối công việc. Mặc định, tham số này không được đặt và số lượng phân vùng của tập dữ liệu đầu vào sẽ được sử dụng.FPGrowthModel cung cấp:
freqItemsets: Các tập mục thường xuyên dưới dạng DataFrame với các cột:
items: array: Tập mục cụ thể.freq: long: Số lần tập mục này xuất hiện, dựa trên các tham số mô hình đã cấu hình.associationRules: Các luật kết hợp được tạo ra với độ tin cậy trên minConfidence, dưới dạng DataFrame với các cột:
antecedent: array: Tập mục giả thuyết của luật kết hợp.consequent: array: Tập mục kết luận của luật kết hợp, luôn chứa một phần tử duy nhất.confidence: double: Độ tin cậy của luật kết hợp.lift: double: Một thước đo cho biết mức độ mà giả thuyết dự đoán kết luận, được tính bằng công thức support(antecedent U consequent) / (support(antecedent) x support(consequent)).support: double: Tỷ lệ xuất hiện của cả giả thuyết và kết luận trong tập dữ liệu.Output:
Thuật toán PrefixSpan (Prefix-projected Sequential pattern mining) được sử dụng để khai thác các mẫu tuần tự thường xuyên trong tập dữ liệu. Trong Spark, thuật toán này được triển khai để tìm các chuỗi con xuất hiện thường xuyên trong các chuỗi dài hơn, hữu ích trong các ứng dụng như phân tích hành vi người dùng hoặc phát hiện mẫu trong dữ liệu thời gian.
Apache Spark cung cấp một triển khai thuật toán PrefixSpan trong spark.ml để khai thác các mẫu tuần tự thường xuyên. Dưới đây là mô tả chi tiết về các tham số quan trọng của thuật toán này:
minSupport:
maxPatternLength:
maxLocalProjDBSize:
sequenceCol:
"sequence").null trong cột này sẽ bị bỏ qua.from pyspark.sql import SparkSession
from pyspark.ml.fpm import PrefixSpan
from pyspark.sql import Row
spark = SparkSession.builder.appName("PrefixSpan Example").getOrCreate()
df = spark.createDataFrame([Row(sequence=[[1, 2], [3]]),
Row(sequence=[[1], [3, 2], [1, 2]]),
Row(sequence=[[1, 2], [5]]),
Row(sequence=[[6]])])
prefixSpan = PrefixSpan(minSupport=0.5, maxPatternLength=5,
maxLocalProjDBSize=32000000)
# Find frequent sequential patterns.
prefixSpan.findFrequentSequentialPatterns(df).show()
Output:
Tối ưu hóa Máy học: Lựa chọn mô hình và điều chỉnh siêu tham số là một phần quan trọng trong quy trình phát triển mô hình, giúp tìm ra mô hình hoặc các tham số tốt nhất cho một nhiệm vụ cụ thể. Apache Spark cung cấp các công cụ như CrossValidator và TrainValidationSplit để hỗ trợ quá trình này.
Trong máy học, lựa chọn mô hình liên quan đến việc sử dụng dữ liệu để tìm ra mô hình hoặc các tham số tối ưu cho một nhiệm vụ cụ thể. Quá trình này có thể được thực hiện cho từng Estimator riêng lẻ như LogisticRegression, hoặc cho toàn bộ Pipeline bao gồm nhiều thuật toán, bước tiền xử lý và các bước khác. Spark cho phép người dùng điều chỉnh toàn bộ Pipeline cùng một lúc, thay vì điều chỉnh từng thành phần riêng lẻ.
Các công cụ như CrossValidator và TrainValidationSplit yêu cầu các thành phần sau:
Quy trình hoạt động chung của các công cụ lựa chọn mô hình như sau:
Evaluator có thể là RegressionEvaluator cho các bài toán hồi quy, BinaryClassificationEvaluator cho dữ liệu nhị phân, MulticlassClassificationEvaluator cho các bài toán phân loại đa lớp, MultilabelClassificationEvaluator cho phân loại đa nhãn, hoặc RankingEvaluator cho các bài toán xếp hạng. Chỉ số mặc định được sử dụng để chọn ParamMap tốt nhất có thể được ghi đè bằng phương thức setMetricName trong mỗi Evaluator này.
Để xây dựng lưới tham số, người dùng có thể sử dụng tiện ích ParamGridBuilder. Mặc định, các tập hợp tham số từ lưới tham số được đánh giá tuần tự. Việc đánh giá tham số có thể được thực hiện song song bằng cách đặt parallelism với giá trị từ 2 trở lên (giá trị 1 sẽ là tuần tự) trước khi chạy lựa chọn mô hình với CrossValidator hoặc TrainValidationSplit. Giá trị của parallelism nên được chọn cẩn thận để tối đa hóa khả năng song song mà không vượt quá tài nguyên của cụm, và các giá trị lớn hơn không phải lúc nào cũng dẫn đến hiệu suất được cải thiện. Thông thường, giá trị lên đến 10 là đủ cho hầu hết các cụm.
CrossValidator bắt đầu bằng cách chia tập dữ liệu thành một tập hợp các folds được sử dụng như các tập huấn luyện và kiểm tra riêng biệt. Ví dụ, với
Để minh họa, giả sử chúng ta có một Pipeline với các bước:
Chúng ta có thể sử dụng ParamGridBuilder để xây dựng lưới tham số cho HashingTF và LogisticRegression, sau đó sử dụng CrossValidator để tìm tập tham số tốt nhất dựa trên chỉ số đánh giá.
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
spark = SparkSession.builder.appName("Cross-Validation Example").getOrCreate()
# Prepare training documents, which are labeled.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2) # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)
# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
print(row)
Output:
TrainValidationSplit là một phương pháp khác để điều chỉnh siêu tham số, hoạt động tương tự như CrossValidator, nhưng chỉ chia tập dữ liệu một lần thành tập huấn luyện và kiểm tra. Phương pháp này thường nhanh hơn CrossValidator, nhưng có thể ít chính xác hơn do chỉ sử dụng một lần chia dữ liệu.
Ví dụ, chúng ta có thể sử dụng TrainValidationSplit với cùng Pipeline và lưới tham số như trên, nhưng chỉ chia tập dữ liệu một lần và tìm tập tham số tốt nhất dựa trên chỉ số đánh giá.
Cả CrossValidator và TrainValidationSplit đều trả về một mô hình đã được huấn luyện với tập tham số tốt nhất, có thể được sử dụng để dự đoán trên dữ liệu mới.
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
spark = SparkSession.builder.appName("Train-Validation split Example").getOrCreate()
# Prepare training and test data.
data = spark.read.format("libsvm")\
.load("file:///home/hadoopvinhtuong/MLib/Data_sources/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)
lr = LinearRegression(maxIter=10)
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.build()
# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
# 80% of the data will be used for training, 20% for validation.
trainRatio=0.8)
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)
# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
.select("features", "label", "prediction")\
.show()
Output:
L-BFGS là một thuật toán tối ưu hóa thuộc họ các phương pháp quasi-Newton, được sử dụng để giải các bài toán tối ưu hóa dạng:
Phương pháp L-BFGS xấp xỉ hàm mục tiêu cục bộ như một hàm bậc hai mà không cần tính toán ma trận Hessian (ma trận đạo hàm bậc hai). Thay vào đó, ma trận Hessian được xấp xỉ bằng các đánh giá gradient trước đó, giúp tránh vấn đề về khả năng mở rộng theo chiều dọc khi số lượng đặc trưng lớn. Do đó, L-BFGS thường đạt được hội tụ nhanh hơn so với các phương pháp tối ưu hóa bậc nhất khác.
OWL-QN (Orthant-Wise Limited-memory Quasi-Newton) là một mở rộng của L-BFGS, có thể xử lý hiệu quả việc regularization L1 và elastic net.
Trong MLlib, L-BFGS được sử dụng làm solver cho các thuật toán như LinearRegression, LogisticRegression, AFTSurvivalRegression và MultilayerPerceptronClassifier.
MLlib triển khai trình giải phương trình chuẩn cho bình phương tối thiểu có trọng số thông qua lớp WeightedLeastSquares.
Với
Số lượng đặc trưng cho mỗi quan sát là
trong đó:
Hàm mục tiêu này chỉ yêu cầu một lần duyệt qua dữ liệu để thu thập các thống kê cần thiết. Đối với ma trận dữ liệu kích thước
Hiện tại, Spark MLlib hỗ trợ hai loại solver cho các phương trình chuẩn: phân tích Cholesky và các phương pháp Quasi-Newton (L-BFGS/OWL-QN). Phân tích Cholesky phụ thuộc vào ma trận hiệp phương sai xác định dương (tức là các cột của ma trận dữ liệu phải độc lập tuyến tính) và sẽ thất bại nếu điều kiện này bị vi phạm.